#!/bin/sh

e2e_test_install() {
  BACKUP_CLUSTER_NAMESPACE="$CLUSTER_NAMESPACE-backup"
  BACKUP_CLUSTER_NAME="$(get_sgcluster_name "$CLUSTER_NAME-backup")"
  MINIO_NAME="$CLUSTER_NAME-backup-minio"
  BUCKET_NAME="$CLUSTER_NAME-backup"
  MINIO_NAMESPACE="$BACKUP_CLUSTER_NAMESPACE"
  install_minio \
    --set tls.enabled=true \
    --set-string tls.certSecret=restore-backup-minio-certs \
    --set-string tls.publicCrt=tls.crt \
    --set-string tls.privateKey=tls.key 

  BACKUP_ENDPOINT="https://restore-backup-minio.$BACKUP_CLUSTER_NAMESPACE:9000"
  create_or_replace_cluster_only "$BACKUP_CLUSTER_NAME" "$BACKUP_CLUSTER_NAMESPACE" 1 \
    -f "$SPEC_FILE.backup.values.yaml" \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
    --set-string "configurations.objectstorage.s3Compatible.endpoint=$BACKUP_ENDPOINT" \
    --set cluster.configurations.backups.useVolumeSnapshot=false

  wait_pods_running "$BACKUP_CLUSTER_NAMESPACE" 1 "$BACKUP_CLUSTER_NAME-[0-9]\+"
  wait_cluster "$BACKUP_CLUSTER_NAME" "$BACKUP_CLUSTER_NAMESPACE"
  switch_cluster_to_first "$BACKUP_CLUSTER_NAME" "$BACKUP_CLUSTER_NAMESPACE"

  create_mock_data

  CLUSTER_SNAPSHOT_NAME="$(get_sgcluster_name "${SPEC_NAME}-snapshot")"

  BACKUP_SOURCE_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-0-$(shuf -i 0-65535 -n 1)")"
  BACKUP_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-0-$(shuf -i 0-65535 -n 1)")"
  BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-0-$(shuf -i 0-65535 -n 1)")"
  BACKUP_VOLUME_SNAPSHOT_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-0-$(shuf -i 0-65535 -n 1)")"
  ALTER_CLUSTER_NAME="$(get_sgcluster_name "$CLUSTER_NAME-alter")"

  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGBackup
metadata:
  namespace: "$BACKUP_CLUSTER_NAMESPACE"
  name: "$BACKUP_SOURCE_NAME"
spec:
  sgCluster: "$BACKUP_CLUSTER_NAME"
  managedLifecycle: false
EOF
  
  wait_until is_backup_phase "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_NAME" "Completed"

  if [ "$K8S_DISABLE_VOLUME_SNAPSHOT" != true ]
  then
    create_or_replace_cluster_only "$BACKUP_CLUSTER_NAME" "$BACKUP_CLUSTER_NAMESPACE" 1 \
      -f "$SPEC_FILE.backup.values.yaml" \
      --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
      --set-string "configurations.objectstorage.s3Compatible.endpoint=$BACKUP_ENDPOINT" \
      --set cluster.configurations.backups.useVolumeSnapshot=true
  
    wait_until eval 'kubectl get cronjob -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_CLUSTER_NAME" -o json \
      | jq ".spec.template.spec.containers[0].env | any(.name == \"USE_VOLUME_SNAPSHOT\" and .value == \"true\")"'
  
    cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGBackup
metadata:
  namespace: "$BACKUP_CLUSTER_NAMESPACE"
  name: "$BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME"
spec:
  sgCluster: "$BACKUP_CLUSTER_NAME"
  managedLifecycle: false
EOF

    wait_until is_backup_phase "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME" "Completed"

    create_or_replace_cluster_only "$BACKUP_CLUSTER_NAME" "$BACKUP_CLUSTER_NAMESPACE" 1 \
      -f "$SPEC_FILE.backup.values.yaml" \
      --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
      --set-string "configurations.objectstorage.s3Compatible.endpoint=$BACKUP_ENDPOINT" \
      --set cluster.configurations.backups.useVolumeSnapshot=false
  
    wait_until eval 'kubectl get cronjob -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_CLUSTER_NAME" -o json \
      | jq ".spec.template.spec.containers[0].env | any(.name == \"USE_VOLUME_SNAPSHOT\" and .value == \"false\")"'
  fi

  rotate_wal_file

  BACKUP_PITR_DATE="$(date +%Y-%m-%dT%H:%M:%SZ --utc)"

  rotate_wal_file

  create_more_mock_data

  rotate_wal_file

  kubectl create ns "$CLUSTER_NAMESPACE"

  create_or_replace_cluster "configurations" "$CLUSTER_NAMESPACE" 0 \
    --set cluster.create=false \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION"
}

is_backup_phase() {
  local NAMESPACE="$1"
  local NAME="$2"
  local STATUS="$3"
  [ "$(kubectl get sgbackup -n "$NAMESPACE" "$NAME" -o=jsonpath='{.status.process.status}')" = "$STATUS" ]
}

e2e_test_uninstall() {
  helm_cleanup_chart "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" || true
  helm_cleanup_chart "$BACKUP_CLUSTER_NAME" "$CLUSTER_NAMESPACE" || true

  k8s_async_cleanup_namespace "$CLUSTER_NAMESPACE" || true
  k8s_async_cleanup_namespace "$BACKUP_CLUSTER_NAMESPACE" || true
}

e2e_test() {
  run_test "Check that backup can be copied across namespaces" check_backup_can_be_copied_from_another_namespace

  run_test "Check that backup can be restored" check_backup_can_be_restored

  if [ "$K8S_DISABLE_VOLUME_SNAPSHOT" != true ]
  then
    run_test "Check that volume snapshot backup can be restored" check_volume_snapshot_backup_can_be_restored
  else
    echo "Skipping Check that volume snapshot backup can be restored since volume snapshot feature is not available"
  fi

  run_test "Check that restore using PITR" check_restore_using_pitr

  run_test "Check that after deleting original backup the backup is not deleted if a copy exists" \
    check_delete_original_backup_dont_delete_backup_if_copy_exists

  run_test "Check that after deleting backup copy the backup is deleted" check_delete_backup_copy_delete_backup

  run_test "Check that cluster keep working after backup is removed" check_delete_backup_dont_require_restart
}

create_mock_data() {
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -q "CREATE DATABASE test;"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "CREATE TABLE fibonacci(num integer);"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (1);"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (2);"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (3);"
}

create_more_mock_data() {
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (5);"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (8);"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "INSERT INTO fibonacci(num) VALUES (13);"
}

rotate_wal_file() {
  CURRENT_WAL_FILE="$(run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" \
    -q "SELECT r.file_name from pg_walfile_name_offset(pg_current_wal_lsn()) as r")"
  run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" -q "CHECKPOINT;"
  wait_until -t "$((E2E_TIMEOUT / 5))" eval '[ "$(run_query -p 5432 -i 0 -n "$BACKUP_CLUSTER_NAMESPACE" -c "$BACKUP_CLUSTER_NAME" -d "test" \
    -q "SELECT r.file_name from pg_walfile_name_offset(pg_switch_wal()) as r")" != "$CURRENT_WAL_FILE" ]'
  wait_until -t "$((E2E_TIMEOUT / 5))" timeout -s KILL "$((E2E_TIMEOUT / 20))" \
    kubectl exec -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_CLUSTER_NAME-0" -c patroni -- \
    exec-with-env backup -- wal-g wal-fetch "$CURRENT_WAL_FILE" "/tmp/$CURRENT_WAL_FILE"
}

check_backup_can_be_copied_from_another_namespace() {
  kubectl get secret -n "$BACKUP_CLUSTER_NAMESPACE" "$MINIO_NAME" -o json \
    | jq ".metadata.namespace = \"$CLUSTER_NAMESPACE\"" \
    | kubectl create -f -
  kubectl get secret -n "$BACKUP_CLUSTER_NAMESPACE" "$MINIO_NAME"-certs -o json \
    | jq ".metadata.namespace = \"$CLUSTER_NAMESPACE\"" \
    | kubectl create -f -
  TMP_BACKUP="$(kubectl get sgbackups.stackgres.io -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_NAME" -o json \
    | jq ".metadata.namespace = \"$CLUSTER_NAMESPACE\" | .metadata.name = \"$BACKUP_NAME\"" \
    | jq ".spec.sgCluster = \"$BACKUP_CLUSTER_NAMESPACE.\" + .spec.sgCluster")"

  printf %s "$TMP_BACKUP" | kubectl create -f -

  TMP_BACKUP="$(kubectl get sgbackups.stackgres.io -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME" -o json \
    | jq ".metadata.namespace = \"$CLUSTER_NAMESPACE\" | .metadata.name = \"$BACKUP_VOLUME_SNAPSHOT_NAME\"" \
    | jq ".status.volumeSnapshot.name = \"$BACKUP_VOLUME_SNAPSHOT_NAME\"" \
    | jq ".spec.sgCluster = \"$BACKUP_CLUSTER_NAMESPACE.\" + .spec.sgCluster")"

  printf %s "$TMP_BACKUP" | kubectl create -f -

  TMP_VOLUME_SNAPSHOT="$(kubectl get volumesnapshot -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME" -o json \
    | jq ".metadata.namespace = \"$CLUSTER_NAMESPACE\" | .metadata.name = \"$BACKUP_VOLUME_SNAPSHOT_NAME\"" \
    | jq "del(.metadata.ownerReferences)" \
    | jq ".spec.source = { volumeSnapshotContentName: \"$BACKUP_VOLUME_SNAPSHOT_NAME\" }" \
    | jq "del(.status)")"

  printf %s "$TMP_VOLUME_SNAPSHOT" | kubectl create -f -

  BACKUP_SOURCE_VOLUME_SNAPSHOT_CONTENT_NAME="$(
    kubectl get volumesnapshot -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_VOLUME_SNAPSHOT_NAME" \
      --template '{{ .status.boundVolumeSnapshotContentName }}')"
  BACKUP_VOLUME_SNAPSHOT_UID="$(
    kubectl get volumesnapshot -n "$CLUSTER_NAMESPACE" "$BACKUP_VOLUME_SNAPSHOT_NAME" \
      --template '{{ .metadata.uid }}')"
  TMP_VOLUME_SNAPSHOT_CONTENT="$(kubectl get volumesnapshotcontent "$BACKUP_SOURCE_VOLUME_SNAPSHOT_CONTENT_NAME" -o json \
    | jq ".metadata.name = \"$BACKUP_VOLUME_SNAPSHOT_NAME\"" \
    | jq ".spec.source = { snapshotHandle: .status.snapshotHandle }" \
    | jq ".spec.volumeSnapshotRef = { apiVersion: \"snapshot.storage.k8s.io/v1\", kind: \"VolumeSnapshot\", name: \"$BACKUP_VOLUME_SNAPSHOT_NAME\", namespace: \"$CLUSTER_NAMESPACE\", uid: \"$BACKUP_VOLUME_SNAPSHOT_UID\"  }" \
    | jq "del(.status)")"

  printf %s "$TMP_VOLUME_SNAPSHOT_CONTENT" | kubectl create -f -

  wait_until eval 'kubectl get volumesnapshot -n "$CLUSTER_NAMESPACE" "$BACKUP_VOLUME_SNAPSHOT_NAME" --template "{{ .status.readyToUse }}" | grep -qxF true'
}

check_backup_can_be_restored() {
  if kubectl get sgbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME" \
    --template '{{ .status.volumeSnapshot.name }}' | grep -qxF "<no value>"
  then
    success "backup not performed using volume snapshot"
  else
    fail "backup performed using volume snapshot"
  fi

  create_or_replace_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 2 \
    --set configurations.create=false \
    --set instanceProfiles=null \
    --set cluster.initialData.restore.fromBackup.name="$BACKUP_NAME" \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION"

  wait_pods_running "$CLUSTER_NAMESPACE" 2 "$CLUSTER_NAME-[0-9]\+"
  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  
  switch_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  

  check_init_data_after_restore

  check_replica_data_after_restore

  check_replication_after_restore

  if [ "$E2E_SKIP_SPEC_UNINSTALL" != true ]
  then
    remove_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi
}

check_volume_snapshot_backup_can_be_restored() {
  local CLUSTER_NAME="$CLUSTER_SNAPSHOT_NAME"

  if kubectl get sgbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_VOLUME_SNAPSHOT_NAME" \
    --template '{{ .status.volumeSnapshot.name }}' | grep -qxF "$BACKUP_VOLUME_SNAPSHOT_NAME"
  then
    success "backup performed using volume snapshot"
  else
    fail "backup not performed using volume snapshot"
  fi

  create_or_replace_cluster_only "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 2 \
    --set configurations.postgresconfig.create=false \
    --set configurations.poolingconfig.create=false \
    --set instanceProfiles=null \
    --set cluster.initialData.restore.fromBackup.name="$BACKUP_VOLUME_SNAPSHOT_NAME" \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
    -f "$SPEC_FILE.backup.values.yaml" \
    --set-string "configurations.objectstorage.s3Compatible.endpoint=$BACKUP_ENDPOINT" \
    --set cluster.configurations.backups.useVolumeSnapshot=true

  wait_pods_running "$CLUSTER_NAMESPACE" 2 "$CLUSTER_NAME-[0-9]\+"
  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  
  switch_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  

  check_init_data_after_restore

  check_replica_data_after_restore

  check_replication_after_restore

  create_or_replace_cluster_only "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 1 \
    --set configurations.postgresconfig.create=false \
    --set configurations.poolingconfig.create=false \
    --set instanceProfiles=null \
    --set cluster.initialData.restore.fromBackup.name="$BACKUP_VOLUME_SNAPSHOT_NAME" \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
    -f "$SPEC_FILE.backup.values.yaml" \
    --set-string "configurations.objectstorage.s3Compatible.endpoint=$BACKUP_ENDPOINT" \
    --set cluster.configurations.backups.useVolumeSnapshot=true

  wait_pods_terminated "$CLUSTER_NAMESPACE" 1 "$CLUSTER_NAME-[0-9]\+"

  BACKUP_NEW_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-2-$(shuf -i 0-65535 -n 1)")"

  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGBackup
metadata:
  namespace: "$CLUSTER_NAMESPACE"
  name: "$BACKUP_NEW_NAME"
spec:
  sgCluster: "$CLUSTER_NAME"
  managedLifecycle: false
EOF

  wait_until is_backup_phase "$CLUSTER_NAMESPACE" "$BACKUP_NEW_NAME" "Completed"

  kubectl delete pod -n "$CLUSTER_NAMESPACE" -l app=StackGresCluster,stackgres.io/cluster-name="$CLUSTER_NAME",stackgres.io/cluster=true

  wait_pods_running "$CLUSTER_NAMESPACE" 1 "$CLUSTER_NAME-[0-9]\+"

  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  
  switch_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"  

  if [ "$E2E_SKIP_SPEC_UNINSTALL" != true ]
  then
    remove_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi
}

check_init_data_after_restore() {
  local RESULT
  RESULT="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^1235813$"
  then
    success "restore primary db restored successfully"
  else
    fail "primary db not restored successfully"
  fi
}

check_replica_data_after_restore() {
  local EXIT_CODE RESULT
  try_function wait_until run_query -p 5432 -i 0 -h "$CLUSTER_NAME-replicas" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" -q 'SELECT 1'
  local QUERY_RESULT
  if "$RESULT"
  then
    QUERY_RESULT="$(run_query -p 5432 -i 0 -h "$CLUSTER_NAME-replicas" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" \
      -q "SELECT num FROM fibonacci ORDER BY num;")"
  fi
  if "$RESULT" && printf '%s' "$QUERY_RESULT" \
    | tr -d '\n' \
    | grep -q "^1235813$"
  then
    success "restore replica db restored successfully"
  else
    fail "replica db not restored successfully"
  fi
}

check_replication_after_restore() {
  local PRIMARY_RESPONSE

  run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "INSERT INTO fibonacci(num) VALUES (21);" -d test
  run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "INSERT INTO fibonacci(num) VALUES (34);" -d test
  run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -q "INSERT INTO fibonacci(num) VALUES (55);" -d test

  PRIMARY_RESPONSE="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" -q "SELECT num FROM fibonacci ORDER BY num;" | tr -d '\n')"

  if [ "$PRIMARY_RESPONSE" = "1235813213455" ]
  then
    if wait_until eval 'run_query -p 5432 -i 0 -h "$CLUSTER_NAME-replicas" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" -q "SELECT num FROM fibonacci ORDER BY num;" | tr -d "\n" | grep -qxF "$PRIMARY_RESPONSE"'
    then
      success "replication is working"
      return 0
    else
      fail "replication is not working. The records don't match between primary and replica for the fibonacci table"
    fi
  else
    fail "inserts on the primary where not successful."
  fi
}

check_restore_using_pitr() {
  if [ "$E2E_SKIP_SPEC_UNINSTALL" = true ]
  then
    remove_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi
  create_or_replace_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 2 \
    --set configurations.create=false \
    --set instanceProfiles=null \
    --set-string "cluster.postgres.version=$E2E_POSTGRES_VERSION" \
    --set-string cluster.initialData.restore.fromBackup.name="$BACKUP_NAME" \
    --set-string cluster.initialData.restore.fromBackup.pointInTimeRecovery.restoreToTimestamp="$BACKUP_PITR_DATE"

  wait_pods_running "$CLUSTER_NAMESPACE" 2 "$CLUSTER_NAME-[0-9]\+"
  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  switch_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  local RESULT
  RESULT="$(run_query -p 5432 -i 0 -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "test" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^123$"
  then
    success "restore replica db restored successfully using PITR"
  else
    fail "replica db not restored successfully using PITR"
  fi
}

check_delete_original_backup_dont_delete_backup_if_copy_exists() {
  BACKUP_INTERNAL_NAME="$(kubectl get sgbackups.stackgres.io -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_NAME"\
    --template '{{ .status.internalName }}')"
  kubectl delete sgbackups.stackgres.io -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_SOURCE_NAME"
  BACKUP_NEW_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-2-$(shuf -i 0-65535 -n 1)")"

  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGBackup
metadata:
  namespace: "$BACKUP_CLUSTER_NAMESPACE"
  name: "$BACKUP_NEW_NAME"
spec:
  sgCluster: "$BACKUP_CLUSTER_NAME"
  managedLifecycle: false
EOF

  wait_until is_backup_phase "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_NEW_NAME" "Completed"

  BACKUP_LIST="$(kubectl exec -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_CLUSTER_NAME-0" -c patroni -- \
    exec-with-env backup -- wal-g backup-list --json 2>/dev/null)"
  if echo "$BACKUP_LIST" | jq -r '.[].backup_name' | grep -qxF "$BACKUP_INTERNAL_NAME"
  then
    success "Backup was not removed when a copy still exists"
  else
    fail "Backup was removed when a copy still exists"
  fi
}

check_delete_backup_copy_delete_backup() {
  BACKUP_INTERNAL_NAME="$(kubectl get sgbackups.stackgres.io -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME"\
    --template '{{ .status.internalName }}')"
  kubectl delete sgbackups.stackgres.io -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME"
  BACKUP_NEW_NAME="$(get_sgbackup_name "${CLUSTER_NAME}-backup-3-$(shuf -i 0-65535 -n 1)")"

  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGBackup
metadata:
  namespace: "$BACKUP_CLUSTER_NAMESPACE"
  name: "$BACKUP_NEW_NAME"
spec:
  sgCluster: "$BACKUP_CLUSTER_NAME"
  managedLifecycle: false
EOF

  wait_until is_backup_phase "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_NEW_NAME" "Completed"

  BACKUP_LIST="$(kubectl exec -n "$BACKUP_CLUSTER_NAMESPACE" "$BACKUP_CLUSTER_NAME-0" -c patroni -- \
    exec-with-env backup -- wal-g backup-list --json 2>/dev/null)"
  if echo "$BACKUP_LIST" | jq -r '.[].backup_name' | grep -qxF "$BACKUP_INTERNAL_NAME"
  then
    fail "Backup was not removed when no copy exists"
  else
    success "Backup was removed when no copy exists"
  fi
}

check_delete_backup_dont_require_restart() {
  if ! wait_until -t 16 eval '
    kubectl wait sgclusters.stackgres.io -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" \
      --for=condition=PendingRestart --timeout 0'
  then
    success "Cluster is not pending restart."
  else
    fail "Cluster is pending restart."
  fi

  kubectl patch sgclusters.stackgres.io -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" --type json \
    --patch '[{"op":"replace","path":"/spec/instances","value":1}]'

  if wait_until eval '[ "$(kubectl get sts -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" -o=jsonpath="{.status.replicas}")" -eq 1 ]'
  then
    success "Cluster is still reconciled."
  else
    fail "Cluster is not reconciled."
  fi

  kubectl delete pod -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0"

  local RESULT
  local EXIT_CODE
  try_function wait_pods_running "$CLUSTER_NAMESPACE" 1 "$CLUSTER_NAME-[0-9]\+"
  if "$RESULT"
  then
    success "Cluster can be restarted."
  else
    fail "Cluster can not be restarted."
  fi
  try_function wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  if "$RESULT"
  then
    success "Cluster primary has been elected."
  else
    fail "Cluster primary has not been elected."
  fi
}
